[AWS IoT Core] MQTT v5 で追加されたユーザープロパティを ルール で取得して Lambda で使用してみました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
MQTT v5 では、拡張されたプロパティの1つとして「ユーザープロパティ」が定義されており、UTF-8 キーと値のペアで自由に定義することができます。
今回は、AWS IoT Core のルールで実行される Lambda に、このプロパティの内容を渡す要領を確認してみました。
使用したのは、ルールのファンクションとして新設された、get_user_properties(userPropertyKey) です。
https://docs.aws.amazon.com/iot/latest/developerguide/iot-sql-functions.html#iot-sql-function-get-user-properties
ルールの SQL ステートメントで、上記のファンクションを使用して、アクションに Lambda を設定しています。
なお、現時点(2022/12/03)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python
2 サンプルコード
下記は、動作確認用に作成したクライアントのサンプルコードです。
接続後、ユーザープロパティ「[("key1", "value1"),("key2", "value2"),("key3", "value3")]」を設定したメッセージを1回 Publish するだけのものです。
import time import os import json import ssl import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } def main(): topic = "sensor/device01" client = mqtt.Client("client_id", protocol = mqtt.MQTTv5) client.tls_set(certs["cafile"], certfile=certs["certfile"], keyfile=certs["keyfile"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) client.connect(endpoint, port, properties = None) client.loop_start() time.sleep(1) payload = json.dumps({"message":"hello"}) properties = Properties(PacketTypes.PUBLISH) properties.UserProperty = [("key1", "value1"),("key2", "value2"),("key3", "value3")] client.publish(topic, payload ,properties = properties) time.sleep(1) if __name__ == "__main__": main()
※ 現時点(2022/12/04)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
送信すると、MQTT テストクライアントでメッセージが到着していることを確認できます。
3 Rules
続いてルールの設定です。
get_user_properties(userPropertyKey) を使用した SQL ステートメントは、以下のようになっています。
SELECT *, get_user_properties() as userProperty FROM 'sensor/#'
アクションには、Lambda 関数を設定しています。
4 Lambda
ルール のアクションに指定された Lambda のコードです。
最初に event を print して、Lambda 実行時に渡されたパラメータを表示しています。 また、ルールでuserPropertyに設定したユーザープロパティの内容を、パースして表示しています。
import json def lambda_handler(event, context): print("event: {}".format(event)) for property in event["userProperty"]: for key in property.keys(): value = property[key] print("{}:{}".format(key, value))
メッセージが到着して実行された Lambda のログです。
event: {'message': 'hello', 'userProperty': [{'key1': 'value1'}, {'key2': 'value2'}, {'key3': 'value3'}]} key1:value1 key2:value2 key3:value3
クライアントが送ったユーザープロパティの内容が利用できていることを確認できます。
5 get_user_properties()
get_user_properties()については、以下にドキュメントがあります。
ドキュメントに記載されているとおり、get_user_properties の引数にキーを指定することで、一部のプロパティ値を取り出すことも可能です。
SELECT *, get_user_properties('key1') as userProperty FROM 'sensor/#'
event: {'message': 'hello', 'userProperty': ['value1']}
6 最後に
ユーザープロパティを使用すると、既存の Payload に影響を及ぼすことなく、キーペアが自由に定義できます。
今回試したように、その内容を、後段の Lambda 等 に引き継げるとなると、応用範囲も少し広がってくるかも知れません。
なお、AWS のブログでは、get_user_property() でルール上でエンコードしている例が紹介されていました。
7 参考リンク
[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました
[AWS IoT Core] MQTT v5 を使用してレスポンスコードの確認を実装して見ました
[AWS IoT Core] MQTT v5 を使用してフォーマット識別要素で判別する Payload のパースを実装して見ました
[AWS IoT Core] MQTT v5 に対応した 「MQTT テストクライアント」の動作を確認して見ました